package org.voovan.korla.rpc.service;

import org.voovan.korla.exception.KorlaException;
import org.voovan.korla.message.Callback;
import org.voovan.korla.message.Msg;
import org.voovan.korla.rpc.RpcMethod;
import org.voovan.korla.rpc.exception.RpcCallbackException;
import org.voovan.korla.rpc.exception.RpcException;
import org.voovan.korla.rpc.message.RpcCall;
import org.voovan.korla.rpc.message.RpcCallback;
import org.voovan.korla.rpc.message.RpcReturn;
import org.voovan.korla.socket.KorlaConsumerPool;
import org.voovan.korla.socket.KorlarConsumer;
import org.voovan.tools.TEnv;
import org.voovan.tools.TFile;
import org.voovan.tools.TString;
import org.voovan.tools.collection.MultiMap;
import org.voovan.tools.json.JSON;
import org.voovan.tools.log.Logger;
import org.voovan.tools.reflect.ClassModel;
import org.voovan.tools.reflect.TReflect;

import java.io.File;
import java.util.*;
import java.util.concurrent.TimeoutException;

import static org.voovan.korla.rpc.RpcStatic.*;

/**
 * Rpc 消费者
 *
 * @author: helyho
 * korla Framework.
 * WebSite: https://github.com/helyho/korla
 * Licence: Apache v2 License
 */
public class RpcConsumer {
    static {
        registerClass();
    }

    private transient KorlaConsumerPool korlaConsumerPool;

    /**
     * 构造方法
     * @param korlaConsumerPool Korla 消费者管理器
     */
    public RpcConsumer(KorlaConsumerPool korlaConsumerPool) {
        this.korlaConsumerPool = korlaConsumerPool;
    }

    /**
     * 获取 Korla 消费者管理器
     * @return  Korla 连接管理器对象
     */
    public KorlaConsumerPool getKorlaConsumerPool() {
        return korlaConsumerPool;
    }

    /**
     * 调用 RPC 方法
     * @param rpcCall RpcCall 对象
     * @param timeoutMillis 超时时间(消费者管理器获取消费者/消息响应获取)
     * @param <T> 响应类型的范型
     * @return Rpc 调用的响应对象
     * @throws TimeoutException 超时时间
     */
    public <T> T call(RpcCall rpcCall, Integer timeoutMillis) throws TimeoutException, RpcCallbackException {
        KorlarConsumer korlarConsumer = null;
        try {
            korlarConsumer = korlaConsumerPool.getConsumer(timeoutMillis);
            korlarConsumer.send(rpcCall);

            Msg msg = rpcCall.getResponse(timeoutMillis);
            if (msg instanceof RpcReturn) {
                RpcReturn<T> rpcReturn = ((RpcReturn) msg);
                if (rpcReturn.getReturnType() == RpcReturn.RPC_RETURN_VALUE) {
                    return rpcReturn.getReturnValue();
                } else {
                    throw new RpcCallbackException("Provider has exception, " +
                            "[name=" + rpcReturn.getException()[0] + ", id=" + rpcReturn.getId() + "] \r\n" +
                            "Provider exception: " + rpcReturn.getException()[1], rpcReturn.getException()[1]);
                }
            } else {
                throw new RpcException("RpcInvoke.invoke(\"" + rpcCall.getRpcMethodName() + "\") failed, invoke's return is not instance of RpcReturn.class \r\n" + JSON.toJSON(msg));
            }
        } finally {
            if(korlarConsumer!=null) {
                korlaConsumerPool.restitution(korlarConsumer);
            }
        }
    }

    /**
     * 调用 RPC 方法
     * @param rpcCall RpcCall 对象
     * @param <T> 响应类型的范型
     * @return Rpc 调用的响应对象
     * @throws TimeoutException 超时时间
     */
    public <T> T call(RpcCall rpcCall) throws TimeoutException, RpcCallbackException {
        return call(rpcCall, CALL_TIMEOUT);
    }

    /**
     * 调用 RPC 方法
     * @param rpcCall RpcCall 对象
     * @param timeoutMillis 超时时间(消费者管理器获取消费者/消息响应获取)
     * @param rpcCallback rpc 响应回调处理类
     * @param <T> 响应类型的范型
     */
    public <T> void call(RpcCall rpcCall, Integer timeoutMillis, RpcCallback<T> rpcCallback) {
        KorlarConsumer korlarConsumer = null;
        try {
            korlarConsumer = korlaConsumerPool.getConsumer(timeoutMillis);
            rpcCall = rpcCallback.prepare(rpcCall);
            String node = rpcCall.getNode();
            rpcCall.setCallBack(new Callback() {
                @Override
                public Msg apply(Msg msg) {
                    if (msg instanceof RpcReturn) {
                        RpcReturn<T> rpcReturn = ((RpcReturn) msg);
                        if (rpcReturn.getReturnType() == RpcReturn.RPC_RETURN_VALUE) {
                            if (rpcCallback != null) {
                                rpcCallback.onReturn(rpcReturn.getReturnValue());
                            } else if (rpcReturn != null) {
                                Logger.warnf("[RPC] The return value not handler. please set the onReturn function on call method, on node {}", node);
                            }
                        } else {
                            RpcCallbackException rpcCallbackException = new RpcCallbackException("Provider has exception, " +
                                    "[name=" + rpcReturn.getException()[0] + ", id=" + rpcReturn.getId() + "] \r\n" +
                                    "Provider exception: " + rpcReturn.getException()[1] + ", on node " + node, rpcReturn.getException()[1]);
                            if (rpcCallback == null) {
                                rpcCallbackException.printStackTrace();
                            } else {
                                rpcCallback.onException(rpcCallbackException);
                            }
                        }
                    } else {
                        throw new RpcException("RpcInvoke.invoke failed, invoke's return is not instance of RpcReturn.class, on node " + node);
                    }

                    return null;
                }
            });

            korlarConsumer.send(rpcCall);
        } catch (TimeoutException e) {
            rpcCallback.onTimeout(e);
        } finally {
            if(korlarConsumer!=null) {
                korlaConsumerPool.restitution(korlarConsumer);
            }
        }
    }

    /**
     * 调用 RPC 方法
     * @param rpcCall RpcCall 对象
     * @param rpcCallback rpc 响应回调处理类
     * @param <T> 响应类型的范型
     */
    public <T> void call(RpcCall rpcCall, RpcCallback<T> rpcCallback) {
        call(rpcCall, CALL_TIMEOUT, rpcCallback);
    }

    /**
     * 构造 RpcConsumer 消费类
     * @param clazz Rpc 消费类对象(由 buildConsumeClass 自动程程的类)
     * @param <C> Rpc 消费类范型
     * @return Rpc 消费类的对象实例
     */
    public <C> C build(Class<C> clazz) {
        try {
            if(TReflect.isSuper(clazz,RpcService.class)) {
                return (C) TReflect.newInstance(clazz, this);
            } else {
                throw new KorlaException("Class " + clazz.getCanonicalName() + " is not implement by org.voovan.korla.rpc.service.RpcService");
            }
        } catch (Exception e) {
            Logger.error("Build RPC service failed", e);
            return null;
        }
    }

    /**
     * 创建消费 Class 的代码
     * @param sourceRootPath 代码根路径
     */
    public void buildConsumeClass(String sourceRootPath){
        buildConsumeClass(sourceRootPath, null);
    }

    /**
     * 创建消费 Class 的代码
     *
     * @param sourceRootPath 代码根路径
     * @param packagePath 代码保存的包路径, 类似: org.voovan.korla.rpc.consume
     */
    public void buildConsumeClass(String sourceRootPath, String packagePath){
        packagePath = packagePath == null ? CONSUMER_CODE_PACKAGE : packagePath;
        try {
            String modelPackage = packagePath + ".model";
            String targetSourcePath = sourceRootPath + File.separator + packagePath.replaceAll("\\.", File.separator);
            String targetModelSourcePath = sourceRootPath + File.separator + modelPackage.replaceAll("\\.", File.separator);

            List<RpcMethod> rpcMethodList = this.call(RpcCall.newInstance("RpcCommon/getAllRpcMethod", new Object[0]));

            //按 Class 对 RpcMethod 归类
            MultiMap<String, RpcMethod> classAndMethod = new MultiMap<String, RpcMethod>();
            Map<String, String> totalClassModel = new HashMap<String, String>();

            for(RpcMethod rpcMethod : rpcMethodList) {
                String className = rpcMethod.getClassName();
                //方法按所属的类分类
                classAndMethod.putValue(className, rpcMethod);

                //归集所有模型定义
                totalClassModel.putAll(rpcMethod.getClassModel());
            }

            //1.模型代码生成
            if(totalClassModel.size()>0) {
                String consumerModelTemplate = new String(TFile.loadResource("ConsumeModel.template"));
                //构造 Class 代码 ( 通过模板 )
                for(Map.Entry<String, String> classModelEntry : totalClassModel.entrySet()) {
                    String originClassName = ClassModel.getClassName(classModelEntry.getValue());
                    //如果类不存在则简历模型类
                    if(!TEnv.isClassExists(originClassName)) {
                        String className = ClassModel.getSimpleName(classModelEntry.getValue());

                        File classFile = new File(targetModelSourcePath + File.separator + className + ".java");

                        String modelSourceTemplate = consumerModelTemplate;
                        modelSourceTemplate = TString.oneTokenReplace(modelSourceTemplate, "package", modelPackage);
                        modelSourceTemplate = TString.oneTokenReplace(modelSourceTemplate, "code", ClassModel.buildClass(classModelEntry.getValue()));
                        if (classFile.exists()) {
                            TFile.deleteFile(classFile);
                        }

                        //检查并创建目录
                        TFile.mkdir(targetModelSourcePath);
                        //保存代码
                        TFile.writeFile(classFile, true, modelSourceTemplate.getBytes());
                        Logger.simplef("Model class [{}] source write to {}", className, classFile.getCanonicalPath());
                    }
                }
            }

            //2. Rpc 类代码生成
            if(classAndMethod.size() > 0) {
                TFile.mkdir(targetSourcePath);
                String consumerClassTemplate = new String(TFile.loadResource("ConsumeClass.template"));

                boolean importModel = false;

                //2.1 构造 Class 代码 ( 通过模板 )
                for(Map.Entry<String,List<RpcMethod>> entry : classAndMethod.entrySet()) {
                    HashMap<String, String> oneClassModelMap = new HashMap<String, String>();

                    String className = entry.getKey();
                    File classFile = new File(targetSourcePath + File.separator + className + ".java");
                    String classSourceTemplate = consumerClassTemplate;

                    classSourceTemplate = TString.oneTokenReplace(classSourceTemplate, "package", packagePath);
                    classSourceTemplate = TString.oneTokenReplace(classSourceTemplate, "className", className);

                    //构造 RpcMethod 的调用方法代码
                    {
                        String methodCode = "";
                        for (RpcMethod rpcMethod : entry.getValue()) {
                            methodCode += rpcMethod.buildMethodCode(false);
                            methodCode += rpcMethod.buildMethodCode(true);
                            oneClassModelMap.putAll(rpcMethod.getClassModel());
                        }

                        //将 RpcMethod 调用方法代码替换到 Class 中
                        classSourceTemplate = TString.oneTokenReplace(classSourceTemplate, "methodCode", methodCode);
                    }

                    //2.2 添加模型的序列化注册
                    {
                        HashSet<String> importList = new HashSet<>();
                        StringBuilder initCode = new StringBuilder();
                        StringBuilder importCode = new StringBuilder();
                        for (Map.Entry<String, String> classModelEntry : oneClassModelMap.entrySet()) {
                            String originClassName = ClassModel.getClassName(classModelEntry.getValue());
                            //如果类不存在则注册模型类序列化,使 RpcProvicer 可以正常反序列化
                            if(!TEnv.isClassExists(originClassName)) {
                                importModel = true;

                                String classSimpleName = ClassModel.getSimpleName(classModelEntry.getValue());
                                //TSerialize.register(原始类现定名, 本地生成类限定名)
                                initCode.append("TSerialize.register(\"")
                                        .append(classModelEntry.getKey()).append("\", ")
                                        .append(packagePath).append(".model.").append(classSimpleName).append(".class);");

                            }
                            //如果类存在则直接 import
                            else {
                                importList.add(originClassName);
                            }
                        }

                        //2.3 是否导入模型包内所有的类
                        if(importModel) {
                            importList.add(modelPackage + ".*");
                        }

                        for(String importItem : importList) {
                            importCode.append("import ").append(importItem).append(";");
                        }

                        classSourceTemplate = TString.oneTokenReplace(classSourceTemplate, "importCode", importCode.toString());
                        classSourceTemplate = TString.oneTokenReplace(classSourceTemplate, "initCode", initCode.toString());
                    }

                    if(classFile.exists()) {
                        TFile.deleteFile(classFile);
                    }

                    //保存代码
                    TFile.writeFile(classFile, true, classSourceTemplate.getBytes());
                    Logger.simplef("Class [{}] source write to {}", className, classFile.getCanonicalPath());
                }
            }
        } catch (Exception e) {
            Logger.error("[Korla] buildConsumeClass has error", e);
        }
    }

    /**
     * 静态方法 Rpc 消费器
     * @param host Rpc 服务主机地址
     * @param port Rpc 服务主机端口
     * @param readTimeout 读超时时间
     * @param sendTimeout 写超时时间
     * @param poolSize 缓存连接的数量
     * @return RpcConsumer 对象
     */
    public static RpcConsumer newInstance(String host, int port, int readTimeout, int sendTimeout, int poolSize){
        return newInstance(host, port, readTimeout, sendTimeout, poolSize, poolSize);
    }

    /**
     * 静态方法 Rpc 消费器
     * @param host Rpc 服务主机地址
     * @param port Rpc 服务主机端口
     * @param readTimeout 读超时时间
     * @param sendTimeout 写超时时间
     * @param poolSize 缓存连接的数量
     * @param minPoolSize 最小容量
     * @param maxPoolSize 最大容量
     */
    public static RpcConsumer newInstance(String host, int port, int readTimeout, int sendTimeout, int minPoolSize, int maxPoolSize){
        KorlaConsumerPool korlaConsumerPool = new KorlaConsumerPool(host, port, readTimeout, sendTimeout, minPoolSize, maxPoolSize);
        return new RpcConsumer(korlaConsumerPool);
    }

    /**
     * 构造方法
     * @param korlaConsumerPool Korla 消费者管理器
     */
    public static RpcConsumer newInstance(KorlaConsumerPool korlaConsumerPool){
        return new RpcConsumer(korlaConsumerPool);
    }
}
